package backtype.storm.utils;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;

import backtype.storm.metric.api.IStatefulObject;

import com.lmax.disruptor.*;


/**
 * 
 * A single consumer queue that uses the LMAX Disruptor. They key to the
 * performance is the ability to catch up to the producer by processing tuples
 * in batches.
 */
public class DisruptorQueue implements IStatefulObject {
    static final Object FLUSH_CACHE = new Object();
    static final Object INTERRUPT = new Object();

    RingBuffer<MutableObject> _buffer;
    Sequence _consumer;
    SequenceBarrier _barrier;

    // TODO: consider having a threadlocal cache of this variable to speed up
    // reads?
    volatile boolean consumerStartedFlag = false;
    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();


    public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) {
        _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
        _consumer = new Sequence();
        _barrier = _buffer.newBarrier();
        _buffer.setGatingSequences(_consumer);
        if (claim instanceof SingleThreadedClaimStrategy) {
            consumerStartedFlag = true;
        }
    }


    public void consumeBatch(EventHandler<Object> handler) {
        consumeBatchToCursor(_barrier.getCursor(), handler);
    }


    public void haltWithInterrupt() {
        publish(INTERRUPT);
    }


    public Object poll() {
        // @@@
        // should use _cache.isEmpty, but it is slow
        // I will change the logic later
        if (consumerStartedFlag == false) {
            return _cache.poll();
        }

        final long nextSequence = _consumer.get() + 1;
        if (nextSequence <= _barrier.getCursor()) {
            MutableObject mo = _buffer.get(nextSequence);
            _consumer.set(nextSequence);
            return mo.o;
        }
        return null;
    }


    public Object take() {
        // @@@
        // should use _cache.isEmpty, but it is slow
        // I will change the logic later
        if (consumerStartedFlag == false) {
            return _cache.poll();
        }

        final long nextSequence = _consumer.get() + 1;
        // final long availableSequence;
        try {
            _barrier.waitFor(nextSequence);
        }
        catch (AlertException e) {
            throw new RuntimeException(e);
        }
        catch (InterruptedException e) {
            // throw new RuntimeException(e);
            return null;
        }
        MutableObject mo = _buffer.get(nextSequence);
        _consumer.set(nextSequence);
        return mo.o;
    }


    public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
        try {
            final long nextSequence = _consumer.get() + 1;
            // blocking all the time may cause deadlock : see
            // @NettyUnitTest.test_batch()
            // final long availableSequence = _barrier.waitFor(nextSequence, 10,
            // TimeUnit.MILLISECONDS);
            final long availableSequence = _barrier.waitFor(nextSequence);
            if (availableSequence >= nextSequence) {
                consumeBatchToCursor(availableSequence, handler);
            }
        }
        catch (AlertException e) {
            throw new RuntimeException(e);
        }
        catch (InterruptedException e) {
            // throw new RuntimeException(e);
            return;
        }
    }


    private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
        for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
            try {
                MutableObject mo = _buffer.get(curr);
                Object o = mo.o;
                mo.setObject(null);
                if (o == FLUSH_CACHE) {
                    Object c = null;
                    while (true) {
                        c = _cache.poll();
                        if (c == null)
                            break;
                        else
                            handler.onEvent(c, curr, true);
                    }
                }
                else if (o == INTERRUPT) {
                    throw new InterruptedException("Disruptor processing interrupted");
                }
                else {
                    handler.onEvent(o, curr, curr == cursor);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        // TODO: only set this if the consumer cursor has changed?
        _consumer.set(cursor);
    }


    /*
     * Caches until consumerStarted is called, upon which the cache is flushed
     * to the consumer
     */
    public void publish(Object obj) {
        try {
            publish(obj, true);
        }
        catch (InsufficientCapacityException ex) {
            throw new RuntimeException("This code should be unreachable!");
        }
    }


    public void tryPublish(Object obj) throws InsufficientCapacityException {
        publish(obj, false);
    }


    public void publish(Object obj, boolean block) throws InsufficientCapacityException {
        if (consumerStartedFlag) {
            final long id;
            if (block) {
                id = _buffer.next();
            }
            else {
                id = _buffer.tryNext(1);
            }
            final MutableObject m = _buffer.get(id);
            m.setObject(obj);
            _buffer.publish(id);
        }
        else {
            _cache.add(obj);
            if (consumerStartedFlag)
                flushCache();
        }
    }


    public void consumerStarted() {
        if (!consumerStartedFlag) {
            consumerStartedFlag = true;
            flushCache();
        }
    }


    private void flushCache() {
        publish(FLUSH_CACHE);
    }


    public void clear() {
        while (population() != 0L) {
            poll();
        }
    }


    public long population() {
        return (writePos() - readPos());
    }


    public long capacity() {
        return _buffer.getBufferSize();
    }


    public long writePos() {
        return _buffer.getCursor();
    }


    public long readPos() {
        return _consumer.get();
    }


    public float pctFull() {
        return (1.0F * population() / capacity());
    }


    @Override
    public Object getState() {
        Map state = new HashMap<String, Object>();
        // get readPos then writePos so it's never an under-estimate
        long rp = readPos();
        long wp = writePos();
        state.put("capacity", capacity());
        state.put("population", wp - rp);
        state.put("write_pos", wp);
        state.put("read_pos", rp);
        return state;
    }

    public static class ObjectEventFactory implements EventFactory<MutableObject> {
        @Override
        public MutableObject newInstance() {
            return new MutableObject();
        }
    }

}
